/**
 * Copyright 2020 Alibaba Group Holding Limited.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.maxgraph.compiler.step;

import static com.google.common.base.Preconditions.checkNotNull;

import com.alibaba.maxgraph.structure.graph.TinkerMaxGraph;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
import org.apache.tinkerpop.gremlin.structure.*;
import org.apache.tinkerpop.gremlin.structure.io.GraphReader;
import org.apache.tinkerpop.gremlin.structure.io.GraphWriter;
import org.apache.tinkerpop.gremlin.structure.io.Mapper;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter;
import org.apache.tinkerpop.gremlin.structure.util.Attachable;
import org.apache.tinkerpop.gremlin.structure.util.Host;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdge;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedProperty;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
import org.apache.tinkerpop.gremlin.structure.util.star.StarGraphGryoSerializer;
import org.apache.tinkerpop.shaded.kryo.Kryo;
import org.apache.tinkerpop.shaded.kryo.io.Input;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;
import java.util.function.Function;

public class MaxGraphGryoReader implements GraphReader {
    private final Kryo kryo;
    private final Map<GraphFilter, StarGraphGryoSerializer> graphFilterCache = new HashMap<>();

    private final long batchSize;

    private MaxGraphGryoReader(final long batchSize, final Mapper<Kryo> gryoMapper) {
        this.kryo = gryoMapper.createMapper();
        this.batchSize = batchSize;
    }

    /**
     * Read data into a {@link Graph} from output generated by any of the {@link GryoWriter} {@code writeVertex} or
     * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.
     *
     * @param inputStream    a stream containing an entire graph of vertices and edges as defined by the accompanying
     *                       {@link GraphWriter#writeGraph(OutputStream, Graph)}.
     * @param graphToWriteTo the graph to write to when reading from the stream.
     * @throws IOException
     */
    @Override
    public void readGraph(final InputStream inputStream, final Graph graphToWriteTo)
            throws IOException {
        if (!(graphToWriteTo instanceof TinkerMaxGraph)) {
            throw new IOException("Only support snapshot maxgraph here.");
        }
        TinkerMaxGraph tinkerMaxGraph = (TinkerMaxGraph) graphToWriteTo;
        Transaction tx = tinkerMaxGraph.tx();
        tx.open();

        Input input = new Input(inputStream);
        // insert all vertices
        List<StarGraph.StarVertex> starVertexList = Lists.newArrayList();
        while (!input.eof()) {
            readHeader(input);
            final StarGraph starGraph = kryo.readObject(input, StarGraph.class);
            // read the terminator
            kryo.readClassAndObject(input);
            StarGraph.StarVertex starVertex = starGraph.getStarVertex();
            Iterator<VertexProperty<Object>> propertyIterator = starVertex.properties();
            List<Object> keyValues = Lists.newArrayList();
            keyValues.add(T.label);
            keyValues.add(starVertex.label());
            keyValues.add("id");
            keyValues.add(Long.parseLong(starVertex.id().toString()));
            while (propertyIterator.hasNext()) {
                VertexProperty<Object> vertexProperty = propertyIterator.next();
                keyValues.add(vertexProperty.key());
                keyValues.add(vertexProperty.value());
            }
            tinkerMaxGraph.addVertexAsync(keyValues.toArray());
            starVertexList.add(starVertex);
        }
        tx.commit();

        // query the inserted vertices
        Iterator<Vertex> vertexIterator = graphToWriteTo.vertices();
        Map<Integer, Vertex> insertedVertexMap = Maps.newHashMap();
        while (vertexIterator.hasNext()) {
            Vertex vertex = vertexIterator.next();
            insertedVertexMap.put(((Long) vertex.property("id").value()).intValue(), vertex);
        }

        // insert all the edges
        for (StarGraph.StarVertex starVertex : starVertexList) {
            Iterator<Edge> edgeIterator = starVertex.edges(Direction.OUT);
            while (edgeIterator.hasNext()) {
                Edge edge = edgeIterator.next();
                Vertex srcVertex = edge.outVertex();
                Vertex dstVertex = edge.inVertex();
                Vertex insertedSrcVertex = insertedVertexMap.get(srcVertex.id());
                Vertex insertedDstVertex = insertedVertexMap.get(dstVertex.id());
                checkNotNull(insertedSrcVertex, "src vertex cant be null for edge " + edge);
                checkNotNull(insertedDstVertex, "dst vertex cant be null for edge " + edge);
                List<Object> keyValues = Lists.newArrayList();
                Iterator<Property<Object>> propertyIterator = edge.properties();
                keyValues.add("id");
                keyValues.add(edge.id());
                while (propertyIterator.hasNext()) {
                    Property<Object> property = propertyIterator.next();
                    keyValues.add(property.key());
                    keyValues.add(property.value());
                }
                tinkerMaxGraph.addEdgeAsync(
                        insertedSrcVertex, insertedDstVertex, edge.label(), keyValues.toArray());
            }
        }
        tx.commit();
    }

    /**
     * Read {@link Vertex} objects from output generated by any of the {@link GryoWriter} {@code writeVertex} or
     * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.
     *
     * @param inputStream                a stream containing at least one {@link Vertex} as defined by the accompanying
     *                                   {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} or
     *                                   {@link GraphWriter#writeVertices(OutputStream, Iterator)} methods.
     * @param vertexAttachMethod         a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
     * @param edgeAttachMethod           a function that creates re-attaches a {@link Edge} to a {@link Host} object.
     * @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}.
     */
    @Override
    public Iterator<Vertex> readVertices(
            final InputStream inputStream,
            final Function<Attachable<Vertex>, Vertex> vertexAttachMethod,
            final Function<Attachable<Edge>, Edge> edgeAttachMethod,
            final Direction attachEdgesOfThisDirection)
            throws IOException {
        throw new UnsupportedOperationException("Not support");
    }

    @Override
    public Optional<Vertex> readVertex(final InputStream inputStream, final GraphFilter graphFilter)
            throws IOException {
        StarGraphGryoSerializer serializer = this.graphFilterCache.get(graphFilter);
        if (null == serializer) {
            serializer = StarGraphGryoSerializer.withGraphFilter(graphFilter);
            this.graphFilterCache.put(graphFilter, serializer);
        }
        final Input input = new Input(inputStream);
        this.readHeader(input);
        final StarGraph starGraph = this.kryo.readObject(input, StarGraph.class, serializer);
        // read the terminator
        this.kryo.readClassAndObject(input);
        return Optional.ofNullable(starGraph == null ? null : starGraph.getStarVertex());
    }

    /**
     * Read a {@link Vertex}  from output generated by any of the {@link GryoWriter} {@code writeVertex} or
     * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.
     *
     * @param inputStream        a stream containing at least a single vertex as defined by the accompanying
     *                           {@link GraphWriter#writeVertex(OutputStream, Vertex)}.
     * @param vertexAttachMethod a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
     */
    @Override
    public Vertex readVertex(
            final InputStream inputStream,
            final Function<Attachable<Vertex>, Vertex> vertexAttachMethod)
            throws IOException {
        return readVertex(inputStream, vertexAttachMethod, null, null);
    }

    /**
     * Read a {@link Vertex} from output generated by any of the {@link GryoWriter} {@code writeVertex} or
     * {@code writeVertices} methods or by {@link GryoWriter#writeGraph(OutputStream, Graph)}.
     *
     * @param inputStream                a stream containing at least one {@link Vertex} as defined by the accompanying
     *                                   {@link GraphWriter#writeVertices(OutputStream, Iterator, Direction)} method.
     * @param vertexAttachMethod         a function that creates re-attaches a {@link Vertex} to a {@link Host} object.
     * @param edgeAttachMethod           a function that creates re-attaches a {@link Edge} to a {@link Host} object.
     * @param attachEdgesOfThisDirection only edges of this direction are passed to the {@code edgeMaker}.
     */
    @Override
    public Vertex readVertex(
            final InputStream inputStream,
            final Function<Attachable<Vertex>, Vertex> vertexAttachMethod,
            final Function<Attachable<Edge>, Edge> edgeAttachMethod,
            final Direction attachEdgesOfThisDirection)
            throws IOException {
        throw new UnsupportedOperationException("Not support");
    }

    /**
     * Read a {@link VertexProperty} from output generated by
     * {@link GryoWriter#writeVertexProperty(OutputStream, VertexProperty)} or via an {@link VertexProperty} passed
     * to {@link GryoWriter#writeObject(OutputStream, Object)}.
     *
     * @param inputStream                a stream containing at least one {@link VertexProperty} as written by the accompanying
     *                                   {@link GraphWriter#writeVertexProperty(OutputStream, VertexProperty)} method.
     * @param vertexPropertyAttachMethod a function that creates re-attaches a {@link VertexProperty} to a
     *                                   {@link Host} object.
     */
    @Override
    public VertexProperty readVertexProperty(
            final InputStream inputStream,
            final Function<Attachable<VertexProperty>, VertexProperty> vertexPropertyAttachMethod)
            throws IOException {
        final Input input = new Input(inputStream);
        readHeader(input);
        final Attachable<VertexProperty> attachable =
                kryo.readObject(input, DetachedVertexProperty.class);
        return vertexPropertyAttachMethod.apply(attachable);
    }

    /**
     * Read an {@link Edge} from output generated by {@link GryoWriter#writeEdge(OutputStream, Edge)} or via
     * an {@link Edge} passed to {@link GryoWriter#writeObject(OutputStream, Object)}.
     *
     * @param inputStream      a stream containing at least one {@link Edge} as defined by the accompanying
     *                         {@link GraphWriter#writeEdge(OutputStream, Edge)} method.
     * @param edgeAttachMethod a function that creates re-attaches a {@link Edge} to a {@link Host} object.
     */
    @Override
    public Edge readEdge(
            final InputStream inputStream, final Function<Attachable<Edge>, Edge> edgeAttachMethod)
            throws IOException {
        final Input input = new Input(inputStream);
        readHeader(input);
        final Attachable<Edge> attachable = kryo.readObject(input, DetachedEdge.class);
        return edgeAttachMethod.apply(attachable);
    }

    /**
     * Read a {@link Property} from output generated by  {@link GryoWriter#writeProperty(OutputStream, Property)} or
     * via an {@link Property} passed to {@link GryoWriter#writeObject(OutputStream, Object)}.
     *
     * @param inputStream          a stream containing at least one {@link Property} as written by the accompanying
     *                             {@link GraphWriter#writeProperty(OutputStream, Property)} method.
     * @param propertyAttachMethod a function that creates re-attaches a {@link Property} to a {@link Host} object.
     */
    @Override
    public Property readProperty(
            final InputStream inputStream,
            final Function<Attachable<Property>, Property> propertyAttachMethod)
            throws IOException {
        final Input input = new Input(inputStream);
        readHeader(input);
        final Attachable<Property> attachable = kryo.readObject(input, DetachedProperty.class);
        return propertyAttachMethod.apply(attachable);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public <C> C readObject(final InputStream inputStream, final Class<? extends C> clazz)
            throws IOException {
        return clazz.cast(this.kryo.readClassAndObject(new Input(inputStream)));
    }

    private void readHeader(final Input input) throws IOException {
        if (!Arrays.equals(GryoMapper.GIO, input.readBytes(3)))
            throw new IOException(
                    "Invalid format - first three bytes of header do not match expected value");

        // skip the next 13 bytes - for future use
        input.readBytes(13);
    }

    public static Builder build() {
        return new Builder();
    }

    public static final class Builder implements ReaderBuilder<MaxGraphGryoReader> {

        private long batchSize = 10000;
        /**
         * Always use the most recent gryo version by default
         */
        private Mapper<Kryo> gryoMapper = GryoMapper.build().create();

        private Builder() {}

        /**
         * Number of mutations to perform before a commit is executed when using
         * {@link GryoReader#readGraph(InputStream, Graph)}.
         */
        public Builder batchSize(final long batchSize) {
            this.batchSize = batchSize;
            return this;
        }

        /**
         * Supply a mapper {@link GryoMapper} instance to use as the serializer for the {@code KryoWriter}.
         */
        public Builder mapper(final Mapper<Kryo> gryoMapper) {
            this.gryoMapper = gryoMapper;
            return this;
        }

        public MaxGraphGryoReader create() {
            return new MaxGraphGryoReader(batchSize, this.gryoMapper);
        }
    }
}
